---
title: Async Memory
description: Run Mem0 operations without blocking your event loop.
icon: "bolt"
---

`AsyncMemory` gives you a non-blocking interface to Mem0’s storage layer so Python applications can add, search, and manage memories directly from async code. Use it when you embed Mem0 inside FastAPI services, background workers, or any workflow that relies on `asyncio`.

<Info>
  **You’ll use this when…**
  - Your agent already runs in an async framework and you need memory calls to await cleanly.
  - You want to embed Mem0’s storage locally without sending requests through the synchronous client.
  - You plan to mix memory operations with other async APIs (OpenAI, HTTP calls, databases).
</Info>

<Warning>
  `AsyncMemory` expects a running event loop. Always call it inside `async def` functions or through helpers like `asyncio.run()` to avoid runtime errors.
</Warning>

<Note>
  Working in TypeScript? The Node SDK still uses synchronous calls—use `Memory` there and rely on Python’s `AsyncMemory` when you need awaited operations.
</Note>

## Feature anatomy

- **Direct storage access:** `AsyncMemory` talks to the same backends as the synchronous client but keeps everything in-process for lower latency.
- **Method parity:** Each memory operation (`add`, `search`, `get_all`, `delete`, etc.) mirrors the synchronous API, letting you reuse payload shapes.
- **Concurrent execution:** Non-blocking I/O lets you schedule multiple memory tasks with `asyncio.gather`.
- **Scoped organization:** Continue using `user_id`, `agent_id`, and `run_id` to separate memories across sessions and agents.

<AccordionGroup>
  <Accordion title="Async method parity">
    | Operation | Async signature | Notes |
    | --- | --- | --- |
    | Create memories | `await memory.add(...)` | Same arguments as synchronous `Memory.add`. |
    | Search memories | `await memory.search(...)` | Returns dict with `results`, identical shape. |
    | List memories | `await memory.get_all(...)` | Filter by `user_id`, `agent_id`, `run_id`. |
    | Retrieve memory | `await memory.get(memory_id=...)` | Raises `ValueError` if ID is invalid. |
    | Update memory | `await memory.update(memory_id=..., data=...)` | Accepts partial updates. |
    | Delete memory | `await memory.delete(memory_id=...)` | Returns confirmation payload. |
    | Delete in bulk | `await memory.delete_all(...)` | Requires at least one scope filter. |
    | History | `await memory.history(memory_id=...)` | Fetches change log for auditing. |
  </Accordion>
</AccordionGroup>

---

## Configure it

### Initialize the client

```python
import asyncio
from mem0 import AsyncMemory

# Default configuration
memory = AsyncMemory()

# Custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
    # Your custom configuration here
)
memory = AsyncMemory(config=custom_config)
```

<Info icon="check">
  Run `await memory.search(...)` once right after initialization. If it returns memories without errors, your configuration works.
</Info>

<Tip>
  Keep configuration objects close to the async client so you can reuse them across workers without recreating vector store connections.
</Tip>

### Manage lifecycle and concurrency

```python
import asyncio
from contextlib import asynccontextmanager
from mem0 import AsyncMemory

@asynccontextmanager
async def get_memory():
    memory = AsyncMemory()
    try:
        yield memory
    finally:
        # Clean up resources if needed
        pass

async def safe_memory_usage():
    async with get_memory() as memory:
        return await memory.search("test query", user_id="alice")
```

<Tip>
  Wrap the client in an async context manager when you need a clean shutdown (for example, inside FastAPI startup/shutdown hooks).
</Tip>

```python
async def batch_operations():
    memory = AsyncMemory()

    tasks = [
        memory.add(
            messages=[{"role": "user", "content": f"Message {i}"}],
            user_id=f"user_{i}"
        )
        for i in range(5)
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} completed successfully")
```

<Info icon="check">
  When concurrency works correctly, successful tasks return memory IDs while failures surface as exceptions in the `results` list.
</Info>

### Add resilience with retries

```python
import asyncio
from mem0 import AsyncMemory

async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
    for attempt in range(max_retries):
        try:
            return await asyncio.wait_for(operation(), timeout=timeout)
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
        except Exception as exc:
            print(f"Error on attempt {attempt + 1}: {exc}")

        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)

    raise Exception(f"Operation failed after {max_retries} attempts")

async def robust_memory_search():
    memory = AsyncMemory()

    async def search_operation():
        return await memory.search("test query", user_id="alice")

    return await with_timeout_and_retry(search_operation)
```

<Warning>
  Always cap retries—runaway loops can keep the event loop busy and block other tasks.
</Warning>

---

## See it in action

### Core operations

```python
# Create memories
result = await memory.add(
    messages=[
        {"role": "user", "content": "I'm travelling to SF"},
        {"role": "assistant", "content": "That's great to hear!"}
    ],
    user_id="alice"
)

# Search memories
results = await memory.search(
    query="Where am I travelling?",
    user_id="alice"
)

# List memories
all_memories = await memory.get_all(user_id="alice")

# Get a specific memory
specific_memory = await memory.get(memory_id="memory-id-here")

# Update a memory
updated_memory = await memory.update(
    memory_id="memory-id-here",
    data="I'm travelling to Seattle"
)

# Delete a memory
await memory.delete(memory_id="memory-id-here")

# Delete scoped memories
await memory.delete_all(user_id="alice")
```

<Info icon="check">
  Confirm each call returns the same response fields as the synchronous client (IDs, `results`, or confirmation objects). Missing keys usually mean the coroutine wasn’t awaited.
</Info>

<Note>
  `delete_all` requires at least one of `user_id`, `agent_id`, or `run_id`. Provide all three to narrow deletion to a single session.
</Note>

### Scoped organization

```python
await memory.add(
    messages=[{"role": "user", "content": "I prefer vegetarian food"}],
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

all_user_memories = await memory.get_all(user_id="alice")
agent_memories = await memory.get_all(user_id="alice", agent_id="diet-assistant")
session_memories = await memory.get_all(user_id="alice", run_id="consultation-001")
specific_memories = await memory.get_all(
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

history = await memory.history(memory_id="memory-id-here")
```

<Tip>
  Use `history` when you need audit trails for compliance or debugging update logic.
</Tip>

### Blend with other async APIs

```python
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory

async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()

async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
    search_result = await async_memory.search(query=message, user_id=user_id, limit=3)
    relevant_memories = search_result["results"]
    memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)

    system_prompt = (
        "You are a helpful AI. Answer the question based on query and memories.\n"
        f"User Memories:\n{memories_str}"
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": message},
    ]

    response = await async_openai_client.chat.completions.create(
        model="gpt-4.1-nano-2025-04-14",
        messages=messages
    )

    assistant_response = response.choices[0].message.content
    messages.append({"role": "assistant", "content": assistant_response})
    await async_memory.add(messages, user_id=user_id)

    return assistant_response
```

<Info icon="check">
  When everything is wired correctly, the OpenAI response should incorporate recent memories and the follow-up `add` call should persist the new assistant turn.
</Info>

### Handle errors gracefully

```python
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig

async def handle_initialization_errors():
    try:
        config = MemoryConfig(
            vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
            llm={"provider": "openai", "config": {"model": "gpt-4.1-nano-2025-04-14"}}
        )
        AsyncMemory(config=config)
        print("AsyncMemory initialized successfully")
    except ValueError as err:
        print(f"Configuration error: {err}")
    except ConnectionError as err:
        print(f"Connection error: {err}")

async def handle_memory_operation_errors():
    memory = AsyncMemory()
    try:
        await memory.get(memory_id="non-existent-id")
    except ValueError as err:
        print(f"Invalid memory ID: {err}")

    try:
        await memory.search(query="", user_id="alice")
    except ValueError as err:
        print(f"Invalid search query: {err}")
```

<Warning>
  Catch and log `ValueError` exceptions from invalid inputs—async stack traces can otherwise disappear inside background tasks.
</Warning>

### Serve through FastAPI

```python
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory

app = FastAPI()
memory = AsyncMemory()

@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
    try:
        result = await memory.add(messages=messages, user_id=user_id)
        return {"status": "success", "data": result}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))

@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
    try:
        result = await memory.search(query=query, user_id=user_id, limit=limit)
        return {"status": "success", "data": result}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))
```

<Tip>
  Create one `AsyncMemory` instance per process when using FastAPI—startup hooks are a good place to configure and reuse it.
</Tip>

### Instrument logging

```python
import logging
import time
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_async_operation(operation_name):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            logger.info(f"Starting {operation_name}")
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{operation_name} completed in {duration:.2f}s")
                return result
            except Exception as exc:
                duration = time.time() - start_time
                logger.error(f"{operation_name} failed after {duration:.2f}s: {exc}")
                raise
        return wrapper
    return decorator

@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
    return await memory.add(messages=messages, user_id=user_id)
```

<Info icon="check">
  Logged durations give you the baseline needed to spot regressions once AsyncMemory is in production.
</Info>

---

## Verify the feature is working

- Run a quick add/search cycle and confirm the returned memory content matches your input.
- Inspect application logs to ensure async tasks complete without blocking the event loop.
- In FastAPI or other frameworks, hit health endpoints to verify the shared client handles concurrent requests.
- Monitor retry counters—unexpected spikes indicate configuration or connectivity issues.

---

## Best practices

1. **Keep operations awaited:** Forgetting `await` is the fastest way to miss writes—lint for it or add helper wrappers.
2. **Scope deletions carefully:** Always supply `user_id`, `agent_id`, or `run_id` to avoid purging too much data.
3. **Batch writes thoughtfully:** Use `asyncio.gather` for throughput but cap concurrency based on backend capacity.
4. **Log errors with context:** Capture user and agent scopes to triage failures quickly.
5. **Reuse clients:** Instantiate `AsyncMemory` once per worker to avoid repeated backend handshakes.

---

## Troubleshooting

| Issue | Possible causes | Fix |
| --- | --- | --- |
| Initialization fails | Missing dependencies, invalid config | Validate `MemoryConfig` settings and environment variables. |
| Slow operations | Large datasets, network latency | Cache heavy queries and tune vector store parameters. |
| Memory not found | Invalid ID or deleted record | Check ID source and handle soft-deleted states. |
| Connection timeouts | Network issues, overloaded backend | Apply retries/backoff and inspect infrastructure health. |
| Out-of-memory errors | Oversized batches | Reduce concurrency or chunk operations into smaller sets. |

---

<CardGroup cols={2}>
  <Card title="Master Memory Operations" icon="layers" href="/core-concepts/memory-operations/add">
    Review how add, search, update, and delete behave across synchronous and async clients.
  </Card>
  <Card title="Connect Async Agents" icon="plug" href="/cookbooks/integrations/openai-tool-calls">
    Follow a full workflow that mixes AsyncMemory with OpenAI tool-call automation.
  </Card>
</CardGroup>
